from django.db import connection
from django.db.models import CharField, Max
from django.db.models.functions import Lower
from django.test import TestCase, skipUnlessDBFeature
from django.test.utils import register_lookup

from .models import Celebrity, Fan, Staff, StaffTag, Tag


@skipUnlessDBFeature('can_distinct_on_fields')
@skipUnlessDBFeature('supports_nullable_unique_constraints')
class DistinctOnTests(TestCase):
    @classmethod
    def setUpTestData(cls):
        cls.t1 = Tag.objects.create(name='t1')
        cls.t2 = Tag.objects.create(name='t2', parent=cls.t1)
        cls.t3 = Tag.objects.create(name='t3', parent=cls.t1)
        cls.t4 = Tag.objects.create(name='t4', parent=cls.t3)
        cls.t5 = Tag.objects.create(name='t5', parent=cls.t3)

        cls.p1_o1 = Staff.objects.create(id=1, name="p1", organisation="o1")
        cls.p2_o1 = Staff.objects.create(id=2, name="p2", organisation="o1")
        cls.p3_o1 = Staff.objects.create(id=3, name="p3", organisation="o1")
        cls.p1_o2 = Staff.objects.create(id=4, name="p1", organisation="o2")
        cls.p1_o1.coworkers.add(cls.p2_o1, cls.p3_o1)
        cls.st1 = StaffTag.objects.create(staff=cls.p1_o1, tag=cls.t1)
        StaffTag.objects.create(staff=cls.p1_o1, tag=cls.t1)

        cls.celeb1 = Celebrity.objects.create(name="c1")
        cls.celeb2 = Celebrity.objects.create(name="c2")

        cls.fan1 = Fan.objects.create(fan_of=cls.celeb1)
        cls.fan2 = Fan.objects.create(fan_of=cls.celeb1)
        cls.fan3 = Fan.objects.create(fan_of=cls.celeb2)

    def test_basic_distinct_on(self):
        """QuerySet.distinct('field', ...) works"""
        # (qset, expected) tuples
        qsets = (
            (
                Staff.objects.distinct().order_by('name'),
                [self.p1_o1, self.p1_o2, self.p2_o1, self.p3_o1],
            ),
            (
                Staff.objects.distinct('name').order_by('name'),
                [self.p1_o1, self.p2_o1, self.p3_o1],
            ),
            (
                Staff.objects.distinct('organisation').order_by('organisation', 'name'),
                [self.p1_o1, self.p1_o2],
            ),
            (
                Staff.objects.distinct('name', 'organisation').order_by('name', 'organisation'),
                [self.p1_o1, self.p1_o2, self.p2_o1, self.p3_o1],
            ),
            (
                Celebrity.objects.filter(fan__in=[self.fan1, self.fan2, self.fan3]).distinct('name').order_by('name'),
                [self.celeb1, self.celeb2],
            ),
            # Does combining querysets work?
            (
                (Celebrity.objects.filter(fan__in=[self.fan1, self.fan2]).
                    distinct('name').order_by('name') |
                 Celebrity.objects.filter(fan__in=[self.fan3]).
                    distinct('name').order_by('name')),
                [self.celeb1, self.celeb2],
            ),
            (StaffTag.objects.distinct('staff', 'tag'), [self.st1]),
            (
                Tag.objects.order_by('parent__pk', 'pk').distinct('parent'),
                [self.t2, self.t4, self.t1]
                if connection.features.nulls_order_largest
                else [self.t1, self.t2, self.t4],
            ),
            (
                StaffTag.objects.select_related('staff').distinct('staff__name').order_by('staff__name'),
                [self.st1],
            ),
            # Fetch the alphabetically first coworker for each worker
            (
                (Staff.objects.distinct('id').order_by('id', 'coworkers__name').
                    values_list('id', 'coworkers__name')),
                [(1, 'p2'), (2, 'p1'), (3, 'p1'), (4, None)],
            ),
        )
        for qset, expected in qsets:
            self.assertSequenceEqual(qset, expected)
            self.assertEqual(qset.count(), len(expected))

        # Combining queries with non-unique query is not allowed.
        base_qs = Celebrity.objects.all()
        msg = 'Cannot combine a unique query with a non-unique query.'
        with self.assertRaisesMessage(TypeError, msg):
            base_qs.distinct('id') & base_qs
        # Combining queries with different distinct_fields is not allowed.
        msg = 'Cannot combine queries with different distinct fields.'
        with self.assertRaisesMessage(TypeError, msg):
            base_qs.distinct('id') & base_qs.distinct('name')

        # Test join unreffing
        c1 = Celebrity.objects.distinct('greatest_fan__id', 'greatest_fan__fan_of')
        self.assertIn('OUTER JOIN', str(c1.query))
        c2 = c1.distinct('pk')
        self.assertNotIn('OUTER JOIN', str(c2.query))

    def test_sliced_queryset(self):
        msg = 'Cannot create distinct fields once a slice has been taken.'
        with self.assertRaisesMessage(TypeError, msg):
            Staff.objects.all()[0:5].distinct('name')

    def test_transform(self):
        new_name = self.t1.name.upper()
        self.assertNotEqual(self.t1.name, new_name)
        Tag.objects.create(name=new_name)
        with register_lookup(CharField, Lower):
            self.assertCountEqual(
                Tag.objects.order_by().distinct('name__lower'),
                [self.t1, self.t2, self.t3, self.t4, self.t5],
            )

    def test_distinct_not_implemented_checks(self):
        # distinct + annotate not allowed
        msg = 'annotate() + distinct(fields) is not implemented.'
        with self.assertRaisesMessage(NotImplementedError, msg):
            Celebrity.objects.annotate(Max('id')).distinct('id')[0]
        with self.assertRaisesMessage(NotImplementedError, msg):
            Celebrity.objects.distinct('id').annotate(Max('id'))[0]

        # However this check is done only when the query executes, so you
        # can use distinct() to remove the fields before execution.
        Celebrity.objects.distinct('id').annotate(Max('id')).distinct()[0]
        # distinct + aggregate not allowed
        msg = 'aggregate() + distinct(fields) not implemented.'
        with self.assertRaisesMessage(NotImplementedError, msg):
            Celebrity.objects.distinct('id').aggregate(Max('id'))

    def test_distinct_on_in_ordered_subquery(self):
        qs = Staff.objects.distinct('name').order_by('name', 'id')
        qs = Staff.objects.filter(pk__in=qs).order_by('name')
        self.assertSequenceEqual(qs, [self.p1_o1, self.p2_o1, self.p3_o1])
        qs = Staff.objects.distinct('name').order_by('name', '-id')
        qs = Staff.objects.filter(pk__in=qs).order_by('name')
        self.assertSequenceEqual(qs, [self.p1_o2, self.p2_o1, self.p3_o1])

    def test_distinct_on_get_ordering_preserved(self):
        """
        Ordering shouldn't be cleared when distinct on fields are specified.
        refs #25081
        """
        staff = Staff.objects.distinct('name').order_by('name', '-organisation').get(name='p1')
        self.assertEqual(staff.organisation, 'o2')
